package com.offcn.bigdata.spark.p4

import com.offcn.bigdata.spark.p3.{MyAccumulator, _05AccumulatorOps}
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

/**
  * RDD的排序操作
  *
  *     transformation有sortByKey 和 sortBy，这二者的关系有点类似groupByKey和groupBy
  *     action有takeOrdered
  *
  */
object _01SortOps {
    def main(args: Array[String]): Unit = {
        val conf = new SparkConf()
            .setAppName(s"${_01SortOps.getClass.getSimpleName}")
            .setMaster("local[*]")

        val sc = new SparkContext(conf)

        val list = sc.parallelize(List(
            "a is abstraction in spark is",
            "shared spark that abstraction be is in spark abstraction"
        ), 2)
        var pairs = list.flatMap(_.split("\\s+")).map(word => {
            (word, 1)
        })
        val ret = pairs.reduceByKey(_+_)
//        sortByKeyOps(ret)
//        sortByOps(ret)
        val takeArr = ret.takeOrdered(2)(new Ordering[(String, Int)](){
            override def compare(x: (String, Int), y: (String, Int)): Int = {
                y._2.compareTo(x._2)
            }
        })
        takeArr.foreach(println)

        sc.stop()
    }

    private def sortByOps(ret: RDD[(String, Int)]) = {
        ret.sortBy(t => t._2, ascending = false, numPartitions = 1)
            .foreach(println)
    }

    private def sortByKeyOps(ret: RDD[(String, Int)]) = {
        //按照次数的降序进行排序
        val sbkRDD = ret.map { case (word, count) => (count, word) }
            .sortByKey(
                ascending = false, //是否升序
                numPartitions = 1 //排序之后的分区个数，如果要做全局有序，设置该参数为1，但是不建议在生产中进行全局排序
            ) //分区内的排序，非全局排序
        //            .mapPartitionsWithIndex((index, partition) => {
        //                val list = partition.toList
        //                println(s"分区编号为<${index}>中的元素为：${list}")
        //                list.toIterator
        //            })
        //            sbkRDD.count()
        sbkRDD.foreach { case (count, word) => {
            println(word + "--->" + count)
        }
        }
    }
}
